package com.imooc.mq.rabbitmq;

import com.imooc.utils.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;

@Service
public class MQSender {

	private static Logger log = LoggerFactory.getLogger(MQSender.class);

	@Autowired
	private AmqpTemplate amqpTemplate;
	@Autowired
	private PlatformTransactionManager transactionManager;

	public void send(Object message) {
		String msg=JsonUtil.obj2String(message);
		log.info("我在 queue 中发送了一条消息  message="+msg);
		amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
	}

	/**
	 * 此处开启事物，jms依赖外部spring事物，阻止消息提交
	 * @param message
	 */
//	@Transactional(   此处失败了，并不能回滚
//			isolation = Isolation.REPEATABLE_READ
//			, propagation = Propagation.REQUIRED
//			,readOnly = false
//			, rollbackFor = Exception.class
//	)
	public void sendQueueMessage(Object message) {
		String msg=JsonUtil.obj2String(message);
		log.info("我在 QUEUE_MSG 中发送了一条消息  message="+msg);
		amqpTemplate.convertAndSend(MQConfig.QUEUE_MSG,msg);
//		//此处测试发送消息异常回调
//		if("senderror".equals(msg)){
//			throw new RuntimeException("我是 send error!");
//		}

//		//测试另一种方式能不能让错误消息回滚
//		DefaultTransactionDefinition def = new DefaultTransactionDefinition();
//		def.setTimeout(15);
//		TransactionStatus status = transactionManager.getTransaction(def);
//		try {
//			amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
//			//此处测试发送消息异常回调
//			if("senderror".equals(msg)){
//				throw new RuntimeException("我是 send error!");
//			}
//			transactionManager.commit(status);
//			log.info(" QueueMessage 消息被我提交了 message="+message);
//		} catch (Exception e) {
//			transactionManager.rollback(status);
//			log.info(" QueueMessage 消息被我回滚了 message="+message);
//		}
	}
	public void sendJsmMessage(Object message) {
		String msg=JsonUtil.obj2String(message);
		log.info("我在 QUEUE_JMS 中发送了一条消息  message="+msg);
		amqpTemplate.convertAndSend(MQConfig.QUEUE_JMS,msg);
	}
	public void sendNewMessage(Object message) {
		String msg=JsonUtil.obj2String(message);
		log.info("我在 QUEUE_NEW 中发送了一条消息  message="+msg);
		amqpTemplate.convertAndSend(MQConfig.QUEUE_NEW,msg);
	}

	public void sendTopic(Object msg) {
		log.info("send topic message:"+msg);
		amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
		amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
	}

	public void sendFanout(Object msg) {
		log.info("send fanout message:"+msg);
		amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
	}

	public void sendHeader(Object msg) {
		log.info("send fanout message:"+msg);
		MessageProperties properties = new MessageProperties();
		properties.setHeader("header1", "value1");
		properties.setHeader("header2", "value2");
		Message obj = new Message(msg.toString().getBytes(), properties);
		amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
	}

	
	
}
